package org.databandtech.flink.demo;

import java.math.BigDecimal;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.databandtech.flink.sink.SinkToMySQL;
import org.databandtech.flink.source.SourceFromMySQL;

public class SourceAndSinkByMySQL {

	public static void main(String[] args) {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000));

		String[] columnsRead = new String[] {"name","age"};//定義字段名
        String url = "jdbc:mysql://localhost:3307/flink?useUnicode=true&characterEncoding=utf-8&useSSL=false";
        String user = "root";
        String pass = "mysql";
        String sqlRead = "select name,age from user";
        String sqlWrite = "insert into databand_usermoney(username,age,bonus)values(?,?,?);";
		
        //数据读取
		DataStreamSource<Tuple2<String,Integer>> userDataStreamSource = env.addSource(
						new SourceFromMySQL(url,user,pass,columnsRead,sqlRead));
		userDataStreamSource.print();
		
		//数据写入
		//map 規則是30歲以下的獎金 1000，30-60嵗獎金2000，其餘的3000
		SingleOutputStreamOperator<Tuple3<String,Integer,BigDecimal>> dataWriteStream = 
						userDataStreamSource.map(new MapTransformation());
		dataWriteStream.print();
		dataWriteStream.addSink( new SinkToMySQL(url,user,pass,sqlWrite) );

				
		try {
			env.execute("ok-mysql");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public static class MapTransformation 
			implements MapFunction<Tuple2<String,Integer>,Tuple3<String,Integer,BigDecimal>> {

		private static final long serialVersionUID = 1L;

		@Override
		public Tuple3<String, Integer, BigDecimal> map(Tuple2<String, Integer> in) throws Exception {
			Tuple3<String, Integer, BigDecimal> result = new Tuple3<String, Integer, BigDecimal>();
			result.setField(in.f0, 0);
			result.setField(in.f1, 1);
			
			BigDecimal money = BigDecimal.valueOf(0);
			if (in.f1< 30)
				money = BigDecimal.valueOf(1000);
			else if (in.f1>= 30 && in.f1< 60)
				money = BigDecimal.valueOf(2000);
			else
				money = BigDecimal.valueOf(3000);
				
			result.setField(money, 2);
			
			return null;
		}
	}

}
